library(modeltime)
library(dplyr)
library(EIAapi)
library(jsonlite)
library(gt)
library(plotly)
library(lubridate)
library(modeltime)
source("../pipeline/eia_data.R")
source("../pipeline/backtesting.R")Data Refrsh
Load libraries
API Settings:
meta_json <- read_json(path = "../settings/settings.json")
s <- meta_json$series
series <- lapply(1:length(s), function(i) {
return(data.frame(
parent_id = s[[i]]$parent_id,
parent_name = s[[i]]$parent_name,
subba_id = s[[i]]$subba_id,
subba_name = s[[i]]$subba_name
))
}) |>
bind_rows()
facets_template <- list(
parent = NULL,
subba = NULL
)
eia_api_key <- Sys.getenv("EIA_API_KEY")
api_path <- meta_json$api_path
meta_path <- meta_json$meta_path
data_path <- meta_json$data_path
forecast_path <- meta_json$forecast_path
forecast_log_path <- meta_json$forecast_log_path
calibrated_models_path <- meta_json$calibrated_models_path
h <- meta_json$backtesting$h
lags <- meta_json$backtesting$features$lags |> unlist()
train_length <- meta_json$train_length
offset <- meta_json$offset
tz <- meta_json$timezonemeta_obj <- get_metadata(api_key = eia_api_key, api_path = api_path, meta_path = meta_path, series = series)
gt(meta_obj$request_meta)| parent | subba | end_act | request_start | end | updates_available |
|---|---|---|---|---|---|
| CISO | PGAE | 2024-07-11 07:00:00 | 2024-07-11 08:00:00 | 2024-07-11 07:00:00 | FALSE |
| CISO | SCE | 2024-07-11 07:00:00 | 2024-07-11 08:00:00 | 2024-07-11 07:00:00 | FALSE |
| CISO | SDGE | 2024-07-11 07:00:00 | 2024-07-11 08:00:00 | 2024-07-11 07:00:00 | FALSE |
| CISO | VEA | 2024-07-11 07:00:00 | 2024-07-11 08:00:00 | 2024-07-11 07:00:00 | FALSE |
m <- meta_obj$request_meta
index <- meta_obj$last_index + 1
data <- NULL
meta_new <- NULL
for (i in 1:nrow(m)) {
facets <- facets_template
facets$parent <- m$parent[i]
facets$subba <- m$subba[i]
start <- m$request_start[i]
end <- m$end[i]
print(paste(facets$parent, facets$subba, sep = " - "))
if (m$updates_available[i]) {
temp <- eia_backfill(
start = start - lubridate::hours(24),
end = end + lubridate::hours(24),
offset = offset,
api_key = eia_api_key,
api_path = paste(api_path, "data", sep = ""),
facets = facets
) |> dplyr::filter(time >= start & time <= end)
index <- seq.POSIXt(from = start, to = end, by = "hour")
ts_obj <- data.frame(period = index) |>
left_join(temp, by = c("period" = "time"))
} else {
ts_obj <- NULL
print("No new data is available")
}
meta_temp <- create_metadata(data = ts_obj, start = start, end = end, type = "refresh")
if (is.null(ts_obj)) {
meta_temp$parent <- m$parent[i]
meta_temp$subba <- m$subba[i]
}
if (meta_temp$success) {
print("Append the new data")
d <- append_data(data_path = data_path, new_data = ts_obj, save = TRUE)
meta_temp$update <- TRUE
} else {
meta_temp$update <- FALSE
meta_temp$comments <- paste(meta_temp$comments, "The data refresh failed, please check the log; ", sep = "")
}
meta_temp$index <- NA
meta_df <- as.data.frame(meta_temp)
if (!is.null(ts_obj)) {
data <- bind_rows(data, ts_obj)
}
meta_new <- bind_rows(meta_new, meta_df)
}[1] "CISO - PGAE"
[1] "No new data is available"
[1] "CISO - SCE"
[1] "No new data is available"
[1] "CISO - SDGE"
[1] "No new data is available"
[1] "CISO - VEA"
[1] "No new data is available"
gt(meta_new)| index | parent | subba | time | start | end | start_act | end_act | start_match | end_match | n_obs | na | type | update | success | comments |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| NA | CISO | PGAE | 2024-07-12 03:12:28.801059 | 2024-07-11 08:00:00 | 2024-07-11 07:00:00 | NA | NA | NA | NA | NA | NA | refresh | FALSE | FALSE | No new data is available; The data refresh failed, please check the log; |
| NA | CISO | SCE | 2024-07-12 03:12:28.802846 | 2024-07-11 08:00:00 | 2024-07-11 07:00:00 | NA | NA | NA | NA | NA | NA | refresh | FALSE | FALSE | No new data is available; The data refresh failed, please check the log; |
| NA | CISO | SDGE | 2024-07-12 03:12:28.804247 | 2024-07-11 08:00:00 | 2024-07-11 07:00:00 | NA | NA | NA | NA | NA | NA | refresh | FALSE | FALSE | No new data is available; The data refresh failed, please check the log; |
| NA | CISO | VEA | 2024-07-12 03:12:28.805611 | 2024-07-11 08:00:00 | 2024-07-11 07:00:00 | NA | NA | NA | NA | NA | NA | refresh | FALSE | FALSE | No new data is available; The data refresh failed, please check the log; |
meta_updated <- append_metadata(meta_path = meta_path, new_meta = meta_new, save = TRUE, init = FALSE)[1] "Saving the metadata file"
Plot the Series
We will use Plotly to visualize the series:
if (!is.null(data)) {
d <- data |> arrange(subba, period)
p <- plot_ly(d, x = ~period, y = ~value, color = ~subba, type = "scatter", mode = "lines")
p
} else {
print("No new data is available")
}[1] "No new data is available"
data <- readr::read_csv(file = data_path, col_types = readr::cols(
period = readr::col_datetime(format = ""),
subba = readr::col_character(),
subba_name = readr::col_character(),
parent = readr::col_character(),
parent_name = readr::col_character(),
value = readr::col_double(),
value_units = readr::col_character()
))
head(data)# A tibble: 6 x 8
period subba subba_name parent parent_name value value_units
<dttm> <chr> <chr> <chr> <chr> <dbl> <chr>
1 2018-07-01 08:00:00 PGAE Pacific Gas an~ CISO California~ 12522 megawattho~
2 2018-07-01 09:00:00 PGAE Pacific Gas an~ CISO California~ 11745 megawattho~
3 2018-07-01 10:00:00 PGAE Pacific Gas an~ CISO California~ 11200 megawattho~
4 2018-07-01 11:00:00 PGAE Pacific Gas an~ CISO California~ 10822 megawattho~
5 2018-07-01 12:00:00 PGAE Pacific Gas an~ CISO California~ 10644 megawattho~
6 2018-07-01 13:00:00 PGAE Pacific Gas an~ CISO California~ 10559 megawattho~
# i 1 more variable: type <chr>
p <- plot_ly(data, x = ~period, y = ~value, color = ~subba, type = "scatter", mode = "lines")
pRefresh the forecast
fc <- NULL
input <- data
index <- "period"
var <- "value"
train_length <- 24 * 31 * 25
lags <- lags
init <- FALSE
save <- TRUE
seasonal <- TRUE
trend <- TRUE
input <- input |>
dplyr::select(subba, !!rlang::sym(index), y = !!rlang::sym(var))
input_last_point <- input |>
dplyr::group_by(subba) |>
dplyr::filter(!!rlang::sym(index) == max(!!rlang::sym(index))) |>
dplyr::ungroup() |>
dplyr::select(subba, last_time = !!rlang::sym(index))
log <- load_forecast_log(forecast_log_path = forecast_log_path)
calibrated_models <- readRDS(calibrated_models_path) |>
dplyr::left_join(
log |>
dplyr::filter(success) |>
dplyr::group_by(subba) |>
dplyr::filter(end == max(end)) |>
dplyr::select(subba, method, end),
by = c("subba", "method")
) |>
dplyr::left_join(input_last_point, by = "subba") |>
dplyr::mutate(end_filter = lubridate::floor_date(last_time, unit = "day") - lubridate::hours(1)) |>
dplyr::mutate(refresh = ifelse(end_filter < last_time, TRUE, FALSE))
head(log) index subba model method time forecast_label start
1 1 PGAE LM model6 2024-07-12 02:45:19 2024-07-08 2024-07-08
2 2 SCE GLMNET model18 2024-07-12 02:45:19 2024-07-08 2024-07-08
3 3 SDGE LM model7 2024-07-12 02:45:19 2024-07-08 2024-07-08
4 4 VEA LM model5 2024-07-12 02:45:19 2024-07-08 2024-07-08
end h n_obs n_obs_flag na_flag success score mape
1 2024-07-08 23:00:00 24 24 TRUE FALSE TRUE TRUE 0.04733980
2 2024-07-08 23:00:00 24 24 TRUE FALSE TRUE TRUE 0.04590319
3 2024-07-08 23:00:00 24 24 TRUE FALSE TRUE TRUE 0.07076818
4 2024-07-08 23:00:00 24 24 TRUE FALSE TRUE TRUE 0.05896550
rmse coverage
1 773.92239 1.0000000
2 900.23786 1.0000000
3 184.66469 0.8333333
4 15.02373 0.7916667
head(calibrated_models)# Modeltime Table
# A tibble: 4 x 12
.model_id .model .model_desc .type .calibration_data method partition subba
<int> <named l> <chr> <chr> <list> <chr> <int> <chr>
1 6 <fit[+]> LM Test <tibble [24 x 4]> model6 20 PGAE
2 17 <fit[+]> GLMNET Test <tibble [24 x 4]> model~ 20 SCE
3 7 <fit[+]> LM Test <tibble [24 x 4]> model7 20 SDGE
4 5 <fit[+]> LM Test <tibble [24 x 4]> model5 20 VEA
# i 4 more variables: end <dttm>, last_time <dttm>, end_filter <dttm>,
# refresh <lgl>
# fc <- refresh_forecast(
# input = data,
# forecast_log_path = forecast_log_path,
# forecast_path = forecast_path,
# calibrated_models_path = calibrated_models_path,
# h = h,
# index = "period",
# var = "value",
# train_length = 24 * 31 * 25,
# lags = lags,
# init = FALSE,
# save = TRUE,
# seasonal = TRUE,
# trend = TRUE
# )if (!is.null(fc)) {
head(fc)
plot_forecast(
input = data,
forecast = fc,
var = "value",
index = "period",
hours = 24 * 3
)
}Score the Forecast
fc_log <- load_forecast_log(forecast_log_path = forecast_log_path)
score_rows <- which(!fc_log$score)
if (length(score_rows) == 0) {
message("All models were scored ")
} else {
subba <- unique(fc_log$subba[score_rows])
fc <- load_forecast(forecast_path = forecast_path)
for (i in subba) {
print(i)
d <- data |> dplyr::filter(subba == i)
r <- which(fc_log$subba == i & !fc_log$score)
for (l in r) {
f <- fc |>
dplyr::filter(forecast_label == fc_log$forecast_label[l], subba == i) |>
dplyr::left_join(d |> dplyr::select(time = period, subba, value), by = c("time", "subba")) |>
dplyr::filter(!is.na(value))
fc_log$mape[l] <- mean(abs(f$value - f$yhat) / f$value)
fc_log$rmse[l] <- (mean((f$value - f$yhat)^2))^0.5
fc_log$coverage[l] <- length(which(f$value <= f$upper & f$value >= f$lower)) / nrow(f)
if (nrow(f) == fc_log$h[l]) {
fc_log$score[l] <- TRUE
}
write.csv(fc_log, forecast_log_path, row.names = FALSE)
}
}
gt::gt(fc_log[score_rows, ])
}All models were scored